package transform;

import bean.SensorReading;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Description: TODO QQ1667847363
 * @author: xiao kun tai
 * @date:2021/11/3 10:08
 */
public class Transform3_Reduce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        String inputPath = "src/main/resources/sensor.txt";

        DataStream<String> inputStream = env.readTextFile(inputPath);


        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        /***
         * 分组
         */

        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");


        /**
         * 聚合,
         * 取 当前时间戳 的最大值
         */
        DataStream<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {
            @Override
            public SensorReading reduce(SensorReading sensorReading, SensorReading t1) throws Exception {
                return new SensorReading(sensorReading.getId(), t1.getTimestamp(), Math.max(sensorReading.getTemperature(), t1.getTemperature()));
            }
        });
        DataStream<SensorReading> resultStream1 = keyedStream.reduce((curState, newData) -> {
            return new SensorReading(curState.getId(), newData.getTimestamp(), Math.max(curState.getTemperature(), newData.getTemperature()));

        });


        resultStream.print("max");
        env.execute("");


    }
}
